Introduction

This is a short and simple tutorial of SparkR. We introduce SparkR and we show few simple examples.

More information you can find on the following pages.

What is R?

R is a programming language that is very popular among Data Scientist (better Computer Scientist than an average Statistician and better Statistician than an average Computer Scientist).

Magic of R

  • Open Source
  • Rich of statistical, graphics and general packages
  • One can manipulate R objects directly in C, C++, Fortran, Java
  • It can produce publication quality documents

Limitations of R

  • Single threaded
  • Everything has to fit in memory

How do Data Scientists work with R?

  • Distributed Storages: Hadoop, Mesos, AWS S3, Cassandra…
  • Framework: Hadoop MR, Hive, Pig,…
  • Local Storages: CSV, database, …

What are Spark and SparkR?

Spark:

Framework for cluster computing (you can use with Java, Scala, Python,…)

SparkR

(spark+R): Framework for cluster computing using R

SparkR architecture

1. SparkR initialization

Lets set variable that store location of spark,

## SPARK_HOME <- Sys.getenv("SPARK_HOME")
SPARK_HOME <- "/home/bartek/programs/spark-1.5.2-bin-hadoop2.6/"
## SPARK_HOME <- "/Users/CotePelaez/Documents/spark-1.5.2-bin-hadoop2.6/"

1.1 Load SparR package

.libPaths(c(.libPaths(), file.path(SPARK_HOME,"R/lib/")))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')
library(SparkR)

1.2 Load aditional packages

library(rJava)
library(ggplot2)
library(pipeR)
library(whisker)

1.3 Initialization context

For educational reason Spark allows to be run on one local machine. We get that by assigning master to local machine.

sc <- sparkR.init(master = "local", sparkHome = SPARK_HOME)
## Launching java with spark-submit command /home/bartek/programs/spark-1.5.2-bin-hadoop2.6//bin/spark-submit   "--packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell" /tmp/RtmpUbpn6I/backend_port351e74d4cdf5
hiveContext <- sparkRHive.init(sc)

Now, we can get access to Spark UI at http://localhost:4040

2. Introducing DataFrame

2.1 Difference between data.frame and DataFrame

DataFrame is Spark object that allows to do the computations on distributed storages directly in R. We can create DataFrame object from standard data.frame as follows.

class(mtcars)
## [1] "data.frame"
##df_mtcars <- createDataFrame(sqlContext, mtcars)
df_mtcars <- createDataFrame(hiveContext, mtcars)
class(df_mtcars)
## [1] "DataFrame"
## attr(,"package")
## [1] "SparkR"
df_mtcars
## DataFrame[mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]
head(df_mtcars)
##    mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
## 3 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
## 4 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
## 5 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
## 6 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

Note that this DataFrame object is not in the workspace of R; it is enterily in Spark.

2.2 Different ways to do a simple aggregation

count(filter(df_mtcars, "cyl = 6"))
## [1] 7

Method chaining (piping)

df_mtcars %>>%
  filter("cyl = 6") %>>%
  count
## [1] 7

Querying

df_mtcars %>>%
  registerTempTable("mtcars")


hiveContext %>>%
  tables %>>%
  collect
##   tableName isTemporary
## 1    mtcars        TRUE
sql(hiveContext, "select count(*) from mtcars where cyl = 6") %>>% collect
##   _c0
## 1   7

2.3 Lazy Execution

Spark does not execute calculations until we ask for results. Example:

path1 <- "data/train.csv"
df1 <- read.df(hiveContext, path1,
               source = "com.databricks.spark.csv",
               header="true",
               inferSchema = "true")

head(df1)
##   Store DayOfWeek       Date Sales Customers Open Promo StateHoliday
## 1     1         5 2015-07-31  5263       555    1     1            0
## 2     2         5 2015-07-31  6064       625    1     1            0
## 3     3         5 2015-07-31  8314       821    1     1            0
## 4     4         5 2015-07-31 13995      1498    1     1            0
## 5     5         5 2015-07-31  4822       559    1     1            0
## 6     6         5 2015-07-31  5651       589    1     1            0
##   SchoolHoliday
## 1             1
## 2             1
## 3             1
## 4             1
## 5             1
## 6             1
p <- proc.time()
df1_store3 <-
  df1 %>>% 
  groupBy("Date") %>>%
  agg(Sales="sum")
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   0.018
p <- proc.time()
df1_store3 %>>%
  limit(10) %>>%
  collect
##          Date sum(Sales)
## 1  2015-07-14    9340405
## 2  2015-04-08    6790649
## 3  2014-01-20   10648819
## 4  2013-12-10    7458243
## 5  2013-08-06    6366382
## 6  2015-07-15    8756137
## 7  2015-04-09    6560475
## 8  2014-01-21    8475207
## 9  2013-12-11    7364610
## 10 2013-08-07    6414477
proc.time()-p
##    user  system elapsed 
##   0.008   0.000   5.567
p <- proc.time()
df1_store3 %>>%
  count
## [1] 942
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   7.152

2.4 Cache

df1_store3_cache <-
  df1_store3 %>>% 
  cache

p <- proc.time()
df1_store3_cache %>>%
  limit(10) %>>%
  collect
##          Date sum(Sales)
## 1  2015-07-14    9340405
## 2  2015-04-08    6790649
## 3  2014-01-20   10648819
## 4  2013-12-10    7458243
## 5  2013-08-06    6366382
## 6  2015-07-15    8756137
## 7  2015-04-09    6560475
## 8  2014-01-21    8475207
## 9  2013-12-11    7364610
## 10 2013-08-07    6414477
proc.time()-p
##    user  system elapsed 
##   0.008   0.000   5.502
p <- proc.time()
df1_store3_cache %>>%
  count
## [1] 942
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   3.105
# eliminate if you dont need more
df1_store3_cache %>>% unpersist()
## DataFrame[Date:string, sum(Sales):bigint]

2.3.Basic manipulations in SparkR

Now its time to meet few SparkR functions. There are more of them, and they can be find in SparkR documentation.

https://spark.apache.org/docs/latest/api/R/index.html

  • describe
  • filter
  • select
  • distinct
  • mutate (withColumn)
  • collect
  • join

First we load our datasets.

states_properties_path <- "data/states_properties.csv"

states_division_path <- "data/states_division.csv"

sp_df <- read.df(hiveContext, states_properties_path,
               source = "com.databricks.spark.csv",
               header="true",
               inferSchema = "true")

sd_df <- read.df(hiveContext, states_division_path,
               source = "com.databricks.spark.csv",
               header="true",
               inferSchema = "true")

Describe

sp_df %>>%
  describe %>>%
  collect
##   summary c("50", NA, NA, "Alabama", "Wyoming") state_abb
## 1   count                                    50        50
## 2    mean                                  <NA>      <NA>
## 3  stddev                                  <NA>      <NA>
## 4     min                               Alabama        AK
## 5     max                               Wyoming        WY
##          Population            Income         Illiteracy
## 1                50                50                 50
## 2           4246.42            4435.8 1.1700000000000002
## 3 4419.621033934924 608.2942051343247 0.6034069936618229
## 4               365              3098                0.5
## 5             21198              6315                2.8
##             Life_Exp            Murder           HS_Grad             Frost
## 1                 50                50                50                50
## 2            70.8786 7.377999999999998 53.10800000000002            104.46
## 3 1.3289018172904592 3.654437850066686 7.995819907926756 51.45841427793905
## 4              67.96               1.4              37.8                 0
## 5               73.6              15.1              67.3               188
##       Area
## 1       50
## 2 70735.88
## 3      NaN
## 4     1049
## 5   566432

Select, distinct

sd_df %>>%
  select(sd_df$state_division) %>>%
  distinct %>>%
  count
## [1] 9

Mutate

sp_df %>>%
  mutate(Area_km2= (.$Area * 2.58999)) %>>%
  head
##   c("Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado"
## 1                                                                Alabama
## 2                                                                 Alaska
## 3                                                                Arizona
## 4                                                               Arkansas
## 5                                                             California
## 6                                                               Colorado
##   state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1        AL       3615   3624        2.1    69.05   15.1    41.3    20
## 2        AK        365   6315        1.5    69.31   11.3    66.7   152
## 3        AZ       2212   4530        1.8    70.55    7.8    58.1    15
## 4        AR       2110   3378        1.9    70.66   10.1    39.9    65
## 5        CA      21198   5114        1.1    71.71   10.3    62.6    20
## 6        CO       2541   4884        0.7    72.06    6.8    63.9   166
##     Area  Area_km2
## 1  50708  131333.2
## 2 566432 1467053.2
## 3 113417  293748.9
## 4  51945  134537.0
## 5 156361  404973.4
## 6 103766  268752.9
sp_df %>>%
  head
##   c("Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado"
## 1                                                                Alabama
## 2                                                                 Alaska
## 3                                                                Arizona
## 4                                                               Arkansas
## 5                                                             California
## 6                                                               Colorado
##   state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1        AL       3615   3624        2.1    69.05   15.1    41.3    20
## 2        AK        365   6315        1.5    69.31   11.3    66.7   152
## 3        AZ       2212   4530        1.8    70.55    7.8    58.1    15
## 4        AR       2110   3378        1.9    70.66   10.1    39.9    65
## 5        CA      21198   5114        1.1    71.71   10.3    62.6    20
## 6        CO       2541   4884        0.7    72.06    6.8    63.9   166
##     Area
## 1  50708
## 2 566432
## 3 113417
## 4  51945
## 5 156361
## 6 103766

Join

s_df <-
  sp_df %>>%
  mutate(Area_km2= (.$Area * 2.58999)) %>>%
  join(sd_df, .$state_abb == sd_df$state_abb)

head(s_df)
##   c("Mississippi", "Montana", "Tennessee", "North Carolina", "North Dakota", 
## 1                                                                 Mississippi
## 2                                                                     Montana
## 3                                                                   Tennessee
## 4                                                              North Carolina
## 5                                                                North Dakota
## 6                                                                    Nebraska
##   state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1        MS       2341   3098        2.4    68.09   12.5    41.0    50
## 2        MT        746   4347        0.6    70.56    5.0    59.2   155
## 3        TN       4173   3821        1.7    70.11   11.0    41.8    70
## 4        NC       5441   3875        1.8    69.21   11.1    38.5    80
## 5        ND        637   5087        0.8    72.78    1.4    50.3   186
## 6        NE       1544   4508        0.6    72.60    2.9    59.3   139
##     Area Area_km2 c(24L, 26L, 42L, 33L, 34L, 27L) state_abb
## 1  47296 122496.2                              24        MS
## 2 145587 377068.9                              26        MT
## 3  41328 107039.1                              42        TN
## 4  48798 126386.3                              33        NC
## 5  69273 179416.4                              34        ND
## 6  76483 198090.2                              27        NE
##       state_division
## 1 East South Central
## 2           Mountain
## 3 East South Central
## 4     South Atlantic
## 5 West North Central
## 6 West North Central

GroupBy and agg

d_df <-
  s_df %>>%
  groupBy("state_division") %>>%
  agg(max_income=max(s_df$income), area=sum(s_df$Area))

collect(d_df)
##       state_division max_income   area
## 1    Middle Atlantic       5237 100318
## 2 East South Central       3821 178982
## 3 East North Central       5107 244101
## 4     South Atlantic       5299 266909
## 5            Pacific       6315 891972
## 6           Mountain       5149 856047
## 7 West South Central       4188 427791
## 8 West North Central       5087 507723
## 9        New England       5348  62951
s_df %>>%
  groupBy("state_division") %>>%
  avg("Area", "Income") %>>%
  collect()
##       state_division avg(Area) avg(Income)
## 1    Middle Atlantic  33439.33    4863.000
## 2 East South Central  44745.50    3563.750
## 3 East North Central  48820.20    4669.000
## 4     South Atlantic  33363.62    4355.250
## 5            Pacific 178394.40    5183.200
## 6           Mountain 107005.88    4402.250
## 7 West South Central 106947.75    3773.500
## 8 West North Central  72531.86    4569.714
## 9        New England  10491.83    4423.833

Mixing queries and transformations

registerTempTable(s_df, "s_table")

s2_df <-
  sql(hiveContext,
      "SELECT state_division, Population, Income * Population AS total_income FROM s_table")

s2_df %>>%
  groupBy(.$state_division) %>>%
  agg(total_income=sum(s2_df$total_income)) %>>%
  collect
##       state_division total_income
## 1    Middle Atlantic    179794689
## 2 East South Central     48870755
## 3 East North Central    193620781
## 4     South Atlantic    145896064
## 5            Pacific    142973847
## 6           Mountain     42734949
## 7 West South Central     82682251
## 8 West North Central     75534019
## 9        New England     57696396

3. Examples

3.1 Reading folder with csvs

path <- "data/subdata/test.csv"

data <- read.df(hiveContext, path,
               source = "com.databricks.spark.csv",
               header="true",
               inferSchema = "true")
count(data)
## [1] 41088
path <- "data/subdata/"

data <- read.df(hiveContext, path,
               source = "com.databricks.spark.csv",
               header="true",
               inferSchema = "true")
count(data)
## [1] 164352

3.2 Example with Parquet

sqlContext <- sparkRSQL.init(sc)

train_df <-
  sqlContext %>>%
  read.df("data/trainParquet")

head(train_df)
##   Store DayOfWeek       Date Sales Customers Open Promo StateHoliday
## 1     1         5 2015-07-31  5263       555    1     1            0
## 2     2         5 2015-07-31  6064       625    1     1            0
## 3     3         5 2015-07-31  8314       821    1     1            0
## 4     4         5 2015-07-31 13995      1498    1     1            0
## 5     5         5 2015-07-31  4822       559    1     1            0
## 6     6         5 2015-07-31  5651       589    1     1            0
##   SchoolHoliday
## 1             1
## 2             1
## 3             1
## 4             1
## 5             1
## 6             1
printSchema(train_df)
## root
##  |-- Store: string (nullable = true)
##  |-- DayOfWeek: string (nullable = true)
##  |-- Date: string (nullable = true)
##  |-- Sales: string (nullable = true)
##  |-- Customers: string (nullable = true)
##  |-- Open: string (nullable = true)
##  |-- Promo: string (nullable = true)
##  |-- StateHoliday: string (nullable = true)
##  |-- SchoolHoliday: string (nullable = true)
test_df <-
  sqlContext %>>%
  read.df("data/testParquet")

printSchema(test_df)
## root
##  |-- Id: string (nullable = true)
##  |-- Store: string (nullable = true)
##  |-- DayOfWeek: string (nullable = true)
##  |-- Date: string (nullable = true)
##  |-- Open: string (nullable = true)
##  |-- Promo: string (nullable = true)
##  |-- StateHoliday: string (nullable = true)
##  |-- SchoolHoliday: string (nullable = true)

3.3. Exploratory Analysis

Question 1: How many stores are in train?

train_df %>>%
  select("Store") %>>% distinct %>>% count
## [1] 1115

Question 2: Which day has the highest number of sales between January 2014 and July 2014? Plot evolution.

train_df %>>%
  filter(.$Date <= '2014-07-31'  & .$Date >= '2014-01-01') %>>%
  #filter("Date <= '2014-07-31' AND Date >= '2014-01-01'") %>>%
  groupBy("Date") %>>%
  agg(Sales=sum(train_df$Sales)) %>>%
  collect %>>%
  ggplot() +
  geom_bar(aes(as.Date(Date), Sales), stat="identity") +
  xlab("Date") +
  theme_bw()

Question 3: Which day of the week has the highest number of promotions?

train_df %>>%
  filter("Promo = 1") %>>%
  groupBy("DayOfWeek") %>>%
  agg(Promo=sum(train_df$Promo)) %>>%
  orderBy(desc(.$Promo)) %>>%
  collect
##   DayOfWeek Promo
## 1         1 77760
## 2         2 77580
## 3         3 77580
## 4         4 77580
## 5         5 77580

Question 4: How to use parameters in queries.

train_df %>>%
  registerTempTable("train")

query <- "SELECT COUNT(*) FROM train GROUP BY  Date"

sql(sqlContext, query) %>>%
  head
##    _c0
## 1 1115
## 2 1115
## 3 1115
## 4 1115
## 5 1115
## 6 1115
query_template <-
  "SELECT COUNT(*) FROM train GROUP BY  {{{col}}}"

data <- list( col="Date")
query <- whisker.render(query_template, data)

sql(sqlContext, query) %>>%
  head
##    _c0
## 1 1115
## 2 1115
## 3 1115
## 4 1115
## 5 1115
## 6 1115

3.4 ML

function() {
  library(MASS)

  boston_df <- createDataFrame(hiveContext, Boston)
  lm.fit <-
    glm(medv~lstat,
        data=boston_df,
        family = "gaussian")

  summary(lm.fit)

  intercept <- summary(lm.fit)$coefficients[1]
  slope <- summary(lm.fit)$coefficients[2]

  boston_df %>>%
    sample(FALSE, 0.1) %>>%
    collect %>>%
    ggplot() + geom_point(aes(lstat, medv)) + geom_abline(intercept=intercept, slope=slope)


  lm.fit=glm(medv~lstat+age, data=boston_df,  family = "gaussian")

  summary(lm.fit)

  #library(ISLR)
  #Smarket_df <- createDataFrame(hiveContext, Smarket)
  #glm.fit <-  glm(Direction~Lag1 + Lag2 + Lag3 + Lag4 + Lag5 + Volume,
  #               data=Smarket_df, family = "binomial" )
}()
## function() {
##   library(MASS)
## 
##   boston_df <- createDataFrame(hiveContext, Boston)
##   lm.fit <-
##     glm(medv~lstat,
##         data=boston_df,
##         family = "gaussian")
## 
##   summary(lm.fit)
## 
##   intercept <- summary(lm.fit)$coefficients[1]
##   slope <- summary(lm.fit)$coefficients[2]
## 
##   boston_df %>>%
##     sample(FALSE, 0.1) %>>%
##     collect %>>%
##     ggplot() + geom_point(aes(lstat, medv)) + geom_abline(intercept=intercept, slope=slope)
## 
## 
##   lm.fit=glm(medv~lstat+age, data=boston_df,  family = "gaussian")
## 
##   summary(lm.fit)
## 
##   #library(ISLR)
##   #Smarket_df <- createDataFrame(hiveContext, Smarket)
##   #glm.fit <-  glm(Direction~Lag1 + Lag2 + Lag3 + Lag4 + Lag5 + Volume,
##   #               data=Smarket_df, family = "binomial" )
## }()

Conclussions

Don’t forget to close spark if working on cloud.

sparkR.stop()